{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "8e1ec46b",
   "metadata": {},
   "source": [
    "# Defining a partition key"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "dcfccd0f",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "\n",
    "import platform\n",
    "\n",
    "from IPython.display import Markdown as md\n",
    "\n",
    "from fastkafka._components._subprocess import terminate_asyncio_process\n",
    "from fastkafka._testing.apache_kafka_broker import run_and_match\n",
    "from fastkafka.testing import ApacheKafkaBroker, run_script_and_cancel"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "61526c5c",
   "metadata": {},
   "source": [
    "Partition keys are used in Apache Kafka to determine which partition a message should be written to. This ensures that related messages are kept together in the same partition, which can be useful for ensuring order or for grouping related messages together for efficient processing. Additionally, partitioning data across multiple partitions allows Kafka to distribute load across multiple brokers and scale horizontally, while replicating data across multiple brokers provides fault tolerance.\n",
    "\n",
    "You can define your partition keys when using the `@produces` decorator, this guide will demonstrate to you this feature."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3237efbe",
   "metadata": {},
   "source": [
    "## Return a key from the producing function"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d0af82f7",
   "metadata": {},
   "source": [
    "To define a key for the message that you want to produce to Kafka topic, you need to wrap the response into `KafkaEvent` class and set the key value. Check the example below:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "109cf37d",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "from fastkafka import KafkaEvent\n",
       "\n",
       "@app.produces()\n",
       "async def to_hello_world(msg: str) -> KafkaEvent[HelloWorld]:\n",
       "    return KafkaEvent(HelloWorld(msg=msg), key=b\"my_key\")\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "hello_world_with_key = \"\"\"\n",
    "from fastkafka import KafkaEvent\n",
    "\n",
    "@app.produces()\n",
    "async def to_hello_world(msg: str) -> KafkaEvent[HelloWorld]:\n",
    "    return KafkaEvent(HelloWorld(msg=msg), key=b\"my_key\")\n",
    "\"\"\"\n",
    "\n",
    "md(f\"```python\\n{hello_world_with_key}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "034df146",
   "metadata": {},
   "source": [
    "In the example, we want to return the `HelloWorld` message class with the key defined as *my_key*. So, we wrap the message and key into a KafkaEvent class and return it as such.\n",
    "\n",
    "While generating the documentation, the `KafkaEvent` class will be unwrapped and the `HelloWorld` class will be documented in the definition of message type, same way if you didn't use the key."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d07451d1",
   "metadata": {},
   "source": [
    "!!! info \\\"Which key to choose?\\\"\n",
    "\n",
    "    Although we have defined a fixed key in this example, nothing is stopping you from calculating a key beforehand and passing it in, or using the message parts for key calculation. Just make sure that the key is in `bytes` format when you wrap it in `KafkaEvent`."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "84d30a25",
   "metadata": {},
   "source": [
    "## App example"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d3e2c6ad",
   "metadata": {},
   "source": [
    "We will modify the app example from **@producer basics** guide to return the `HelloWorld` with our key. The final app will look like this (make sure you replace the `<url_of_your_kafka_bootstrap_server>` and `<port_of_your_kafka_bootstrap_server>` with the actual values):"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "673c7f8a",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "\n",
    "app = \"\"\"\n",
    "from fastkafka import FastKafka\n",
    "from pydantic import BaseModel, Field\n",
    "\n",
    "class HelloWorld(BaseModel):\n",
    "    msg: str = Field(\n",
    "        ...,\n",
    "        example=\"Hello\",\n",
    "        description=\"Demo hello world message\",\n",
    "    )\n",
    "\n",
    "kafka_brokers = {\n",
    "    \"demo_broker\": {\n",
    "        \"url\": \"<url_of_your_kafka_bootstrap_server>\",\n",
    "        \"description\": \"local demo kafka broker\",\n",
    "        \"port\": \"<port_of_your_kafka_bootstrap_server>\",\n",
    "    }\n",
    "}\n",
    "\n",
    "app = FastKafka(kafka_brokers=kafka_brokers)\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "11fba86b",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "\n",
    "bg_run = \"\"\"\n",
    "import asyncio\n",
    "\n",
    "@app.run_in_background()\n",
    "async def hello_every_second():\n",
    "    while(True):\n",
    "        await to_hello_world(msg=\"Hello world!\")\n",
    "        await asyncio.sleep(1)\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2abb4c3d",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "from fastkafka import FastKafka\n",
       "from pydantic import BaseModel, Field\n",
       "\n",
       "class HelloWorld(BaseModel):\n",
       "    msg: str = Field(\n",
       "        ...,\n",
       "        example=\"Hello\",\n",
       "        description=\"Demo hello world message\",\n",
       "    )\n",
       "\n",
       "kafka_brokers = {\n",
       "    \"demo_broker\": {\n",
       "        \"url\": \"<url_of_your_kafka_bootstrap_server>\",\n",
       "        \"description\": \"local demo kafka broker\",\n",
       "        \"port\": \"<port_of_your_kafka_bootstrap_server>\",\n",
       "    }\n",
       "}\n",
       "\n",
       "app = FastKafka(kafka_brokers=kafka_brokers)\n",
       "\n",
       "from fastkafka import KafkaEvent\n",
       "\n",
       "@app.produces()\n",
       "async def to_hello_world(msg: str) -> KafkaEvent[HelloWorld]:\n",
       "    return KafkaEvent(HelloWorld(msg=msg), key=b\"my_key\")\n",
       "\n",
       "import asyncio\n",
       "\n",
       "@app.run_in_background()\n",
       "async def hello_every_second():\n",
       "    while(True):\n",
       "        await to_hello_world(msg=\"Hello world!\")\n",
       "        await asyncio.sleep(1)\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "key_example = app + hello_world_with_key + bg_run\n",
    "\n",
    "md(f\"```python\\n{key_example}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "afd85e8b",
   "metadata": {},
   "source": [
    "## Run the app"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ce98e25b",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "Now we can run the app. Copy the code above in producer_example.py and run it by running\n",
       "```shell\n",
       "fastkafka run --num-workers=1 --kafka-broker=demo_broker producer_with_key_example:app\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "script_file = \"producer_with_key_example.py\"\n",
    "cmd = \"fastkafka run --num-workers=1 --kafka-broker=demo_broker producer_with_key_example:app\"\n",
    "md(\n",
    "    f\"Now we can run the app. Copy the code above in producer_example.py and run it by running\\n```shell\\n{cmd}\\n```\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cedba893",
   "metadata": {},
   "source": [
    "After running the command, you should see this output in your terminal:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0049ee5c",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): (<_UnixSelectorEventLoop running=True closed=False debug=False>) is already running!\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): calling nest_asyncio.apply()\n",
      "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: <class 'fastkafka.testing.ApacheKafkaBroker'>.start(): returning 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): entering...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 347072...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 347072 terminated.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 346711...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 346711 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): exited.\n"
     ]
    }
   ],
   "source": [
    "# | hide\n",
    "\n",
    "with ApacheKafkaBroker(\n",
    "    topics=[\"hello_world\"], apply_nest_asyncio=True, listener_port=22092\n",
    ") as bootstrap_server:\n",
    "    server_url = bootstrap_server.split(\":\")[0]\n",
    "    server_port = bootstrap_server.split(\":\")[1]\n",
    "    exit_code, output = await run_script_and_cancel(\n",
    "        script=key_example.replace(\n",
    "            \"<url_of_your_kafka_bootstrap_server>\", server_url\n",
    "        ).replace(\"<port_of_your_kafka_bootstrap_server>\", server_port),\n",
    "        script_file=script_file,\n",
    "        cmd=cmd,\n",
    "        cancel_after=5,\n",
    "    )\n",
    "\n",
    "    expected_returncode = [0, 1]\n",
    "    assert exit_code in expected_returncode, output.decode(\"UTF-8\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "55632ae6",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[347835]: [INFO] fastkafka._application.app: run_in_background() : Adding function 'hello_every_second' as background task\n",
      "[347835]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to '127.0.0.1:9092'\n",
      "[347835]: [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n",
      "[347835]: [INFO] fastkafka._application.app: _populate_bg_tasks() : Starting background task 'hello_every_second'\n",
      "Starting process cleanup, this may take a few seconds...\n",
      "[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 347835...\n",
      "[347835]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Cancelling background task 'hello_every_second'\n",
      "[347835]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Waiting for background task 'hello_every_second' to finish\n",
      "[347835]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Execution finished for background task 'hello_every_second'\n",
      "[INFO] fastkafka._server: terminate_asyncio_process(): Process 347835 terminated.\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "print(output.decode(\"UTF-8\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d4ec6dab",
   "metadata": {},
   "source": [
    "## Check if the message was sent to the Kafka topic with the desired key"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c81e65bc",
   "metadata": {},
   "source": [
    "Lets check the topic and see if there is a \"Hello world!\" message in the hello_world topic with the defined key. In your terminal run:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6ef181f6",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```shell\n",
       "kafka-console-consumer.sh --topic=hello_world --property print.key=true --from-beginning --bootstrap-server=<address_of_your_kafka_bootstrap_server>\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "script_extension = \".bat\" if platform.system() == \"Windows\" else \".sh\"\n",
    "consumer_cmd = f\"kafka-console-consumer{script_extension} --topic=hello_world --property print.key=true --from-beginning --bootstrap-server=<address_of_your_kafka_bootstrap_server>\"\n",
    "md(f\"```shell\\n{consumer_cmd}\\n```\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "476e0e5d",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): (<_UnixSelectorEventLoop running=True closed=False debug=False>) is already running!\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): calling nest_asyncio.apply()\n",
      "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: <class 'fastkafka.testing.ApacheKafkaBroker'>.start(): returning 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 348982...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 348982 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): entering...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 348216...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 348216 terminated.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 347855...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 347855 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): exited.\n"
     ]
    }
   ],
   "source": [
    "# | hide\n",
    "\n",
    "expected_msg = 'my_key\t{\"msg\":\"Hello world!\"}'\n",
    "\n",
    "with ApacheKafkaBroker(\n",
    "    topics=[\"hello_world\"], apply_nest_asyncio=True, listener_port=22092\n",
    ") as bootstrap_server:\n",
    "    server_url = bootstrap_server.split(\":\")[0]\n",
    "    server_port = bootstrap_server.split(\":\")[1]\n",
    "    exit_code, output = await run_script_and_cancel(\n",
    "        script=key_example.replace(\n",
    "            \"<url_of_your_kafka_bootstrap_server>\", server_url\n",
    "        ).replace(\"<port_of_your_kafka_bootstrap_server>\", server_port),\n",
    "        script_file=script_file,\n",
    "        cmd=cmd,\n",
    "        cancel_after=5,\n",
    "    )\n",
    "\n",
    "    expected_returncode = [0, 1]\n",
    "    assert exit_code in expected_returncode, output.decode(\"UTF-8\")\n",
    "\n",
    "    proc = await run_and_match(\n",
    "        *consumer_cmd.replace(\n",
    "            \"<address_of_your_kafka_bootstrap_server>\", bootstrap_server\n",
    "        ).split(\" \"),\n",
    "        pattern=expected_msg,\n",
    "        timeout=30,\n",
    "    )\n",
    "\n",
    "    await terminate_asyncio_process(proc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e523a7d3",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "You should see the *my_key\t{\"msg\": \"Hello world!\"}* messages in your topic appearing, the *my_key* part of the message is the key that we defined in our producing function."
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "md(\n",
    "    f\"You should see the *{expected_msg}* messages in your topic appearing, the *my_key* part of the message is the key that we defined in our producing function.\"\n",
    ")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
